Kafka Rest Consumer性能优化

之前有多语言consume kafka的需求,因此决定采用Confluent的Kafka Rest2.0.1。虽然测试的结果不如人意(java consumer大约60M/s,但用rest的话只有5M/s),考虑到消息量不大,也能凑合用

但最近在kafka集群中新创建了200个测试的空topic,rest consumer的性能急剧降低,惨不忍睹。。。

下载了源码debug了一下,发现每次consume都需要判断一下topic是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public List<Topic> getTopics() {
try {
Seq<String> topicNames = zkUtils.getAllTopics().sorted(Ordering.String$.MODULE$);
return getTopicsData(topicNames);
} catch (RestNotFoundException e) {
throw new InternalServerErrorException(e);
}
}

public boolean topicExists(String topicName) {
List<Topic> topics = getTopics();
for (Topic topic : topics) {
if (topic.getName().equals(topicName)) {
return true;
}
}
return false;
}

MetadataObserver.getTopics()耗时非常长,该函数先从zookeeper中取出所有的topicName,然后再到zookeeper中把所有的topic的metaData查询出来封装成Topic对象然后返回。增加了200个topic之后,获取所有topic的metaData变得非常的耗时间,其实这是完全没有必要的,直接将Seq topicNames返回就行了,因为判断topic是否存在只需要比较topicName。

因此代码改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Collection<String> getTopicNames() {
Seq<String> topicNames = zkUtils.getAllTopics().sorted(Ordering.String$.MODULE$);
return JavaConversions.asJavaCollection(topicNames);
}

public boolean topicExists(String topicName) {
List<String> topics = Lists.newArrayList(getTopicNames());
for (String topic : topics) {
if (topic.equals(topicName)) {
return true;
}
}
return false;
}

那么,rest consumer为什么每次消费都需要先判断Topic是否存在呢?

rest是无状态的,但kafka consumer必须要保存状态,因此必须在rest服务器端缓存consumer instances,每个consume rest请求都是通过缓存的instance向kafka server请求数据。

rest consumer为了限制每次请求返回的数据量,避免客户端内存爆掉,提供了consumer.request.timeout.ms参数,默认值1000ms,意味着无论instance是否从kafka server端拿到了数据,1秒钟后必须给rest 客户端响应,如果没拿到数据,就返回空数组。

那么问题来了,假如topic不存在,consumer instance会获取topic的metadata超时,但在超时之前,就已经超过了1秒,空数组会被返回给rest client。事实上,rest client的每次消费请求都会返回空数组,但它只知道没有拿到数据,并不知道是topic不存在,还是真的没有数据被produce进来。

为了避免这种情况发生,必须处理每个consume请求时先判断topic是否存在。

顺带说一句,java client也应该提供判断topic是否存在的api啊。。。不能让client等这么久最后返回个failed to update metadata after 60000ms…能不能友好一点。。

———————————- 我是分割线 ——————————————–

注:该BUG在3.0版本已被官方修复。